feat: external Lance vector index path for parquet/Delta R (Phase 1)#5
Draft
sezruby wants to merge 6 commits into
Draft
feat: external Lance vector index path for parquet/Delta R (Phase 1)#5sezruby wants to merge 6 commits into
sezruby wants to merge 6 commits into
Conversation
Adds IndexedNearestJoinExternal alongside the existing IndexedNearestJoin (Lance R) and the per-query temp-Lance path (#3). When R is a direct parquet/Delta scan, build a Lance external IVF-PQ index over the source files and probe + refine + post-topK materialize from those files — no temp Lance dataset, no column copies. Sibling RFC + benchmark numbers: #4 == Files == internal/ExternalIndexProbe.scala Scala wrapper around the new ExternalIvfPqIndex JNI handle (in lance-core 7.1.0-beta.1). Decodes Arrow IPC stream from fetchRows into Spark-compatible row maps. internal/ExternalIndexLifecycle.scala Driver-side cache (sha256 keyed by file paths + col + params); cleanup hook registered with the existing LanceTempLifecycle so cluster scratch is cleaned on application end / JVM shutdown — same machinery as the temp-Lance path. internal/ExternalFusedStage.scala Single-stage probe + materialize. Lance returns global top-K per query already (no cross-task merge needed), so the leftId shuffle from the Lance-native pipeline is vestigial here. Fused stage opens the index once per task, probes every left row, batched fetchRows for the partition, emits final join Rows. internal/LanceVectorIndexBuilder.scala Promoted from src/test/scala — needed by IndexedNearestJoinExternalBenchmark to build IVF-PQ on Lance-native R for the C-indexed apples-to-apples reference config. Same body as the existing test helper. Note: fixes a latent arg-order bug in the existing test helper — VectorIndexParams.ivfPq signature is (numPartitions, numBits, numSubVectors, ...), not (numPartitions, numSubVectors, numBits, ...). The bug was masked in the existing test because its defaults set both args to 8 (swap invisible). IndexedNearestJoinExternal.scala Public DataFrame API entry point. SQL Catalyst integration deferred to Phase 3. benchmark/IndexedNearestJoinExternalBenchmark.scala Compares B-narrow / B-wide (temp-Lance, project rid only / +N payload) vs C-indexed-narrow / C-indexed-wide (Lance-native + IVF-PQ index, same projections) vs E (external-index over parquet). Reports E warm vs C-indexed (the apples- to-apples comparison: same algorithm, R lives in parquet vs Lance). Headline cluster numbers at wide-medium (|R|=1M, dim=128, 16 string payload cols, K=10, |L|=100): B-narrow: 18,128 ms E warm: 570 ms (E build amortizes after ~2 queries vs B-narrow) benchmark/IndexedNearestJoinTempRBenchmark.scala Drive-by fix: pre-set spark.lance.knn.tempR.dir from BENCH_DATA_PATH in cluster mode so config D doesn't trip the helper's cluster-mode fail-fast guard. == Lance bump == pom.xml: lance.version 6.0.0-beta.4 → 7.1.0-beta.1 — required for the new ExternalIvfPqIndex API. Companion lance-core PR: sezruby/lance#1 == Tests == IndexedNearestJoinExternalTest passes locally: 16 left vectors × 2 parquet files of 320 rows; top-K returned per left; scores monotone; oracle hit rate above K/2 threshold.
Three fixes to the external-index path based on cluster benchmark findings.
== 1. Parallelism heuristic ==
ExternalFusedStage previously ran with `left.rdd.partitions.size` tasks. For
typical KNN workloads |L| is small (hundreds to thousands of queries) and Spark
right-sizes left to 1-2 partitions accordingly, leaving fetchRows running
serially on a 32-core cluster.
IndexedNearestJoinExternal.apply now sizes parallelism from source row count:
targetRowsPerTask = 100 // ~5-50ms per Lance search × 100 = 0.5-5 sec/task
estimatedRows = left.queryExecution.optimizedPlan.stats.rowCount
sizedParallelism = ceil(estimatedRows / targetRowsPerTask), capped at
defaultParallelism
parallelism = mergeParallelism.getOrElse(sizedParallelism)
Conditional repartition: only UP, never DOWN. Users with already-good
partitioning aren't slowed down.
Also adds diagnostic prints so benchmarks see what parallelism the path picked.
Cluster impact at wide-medium (|R|=1M, |L|=100, 32-core cluster):
Before: 1-2 tasks → ~4,300 ms warm
After: 32 tasks → ~600 ms warm (7× speedup)
== 2. Bench scratch cleanup ==
Cluster scratch volumes accumulate 5-10 GB per bench run and eventually trip
"Disk quota exceeded" mid-write. Bench now sweeps stale `knn-bench-data-*`
siblings from dataRoot.getParent at start. Strict pattern match (only matches
the cpd-submit-bench naming), skips this run's own dataRoot.
== 3. mega-medium opt-in ==
`mega-medium` scale (|R|=10M) is now documented as requiring substantial scratch
volume; default scale lists exclude it.
Cluster bench results at wide-medium (medians across 4 runs):
B-narrow (temp-Lance): 16,506 ms
C-indexed-narrow (Lance-R): 545 ms
E warm (external-index): 609 ms
E build (one-time): 29,529 ms
E warm comparable to C-indexed-narrow (both sub-second); E ~27× faster than
B-narrow. E build amortizes after ~2 queries vs B-narrow.
Two improvements driven by mega-medium cluster runs. == 1. BENCH_CONFIGS env var == The bench previously ran A+B+C+E+F unconditionally. At mega-medium scale (|R|=10M) writing the Lance-native R datasets for C alone takes ~5+ minutes, so running the full suite is 30+ min. BENCH_CONFIGS=b-narrow,e (or any subset) selects which configs run. Default remains all configs when unset. Lance-native R writes + IVF-PQ index build for C are gated behind c-narrow / c-wide activation so they're skipped entirely when not needed. == 2. Cap parallelism at spark.sql.shuffle.partitions == Heuristic was capping at defaultParallelism. cpd-submit-bench.sh sets spark.sql.shuffle.partitions=128 (above 32-core defaultParallelism), and the cap should match Spark's convention for shuffle parallelism — capping at shuffle.partitions gives finer task granularity for large |L|. Cluster mega-medium results (medians across 2 runs): B-narrow: ~166s (3 min target hit; temp-Lance write of ~5GB) C-indexed-narrow: ~13s E warm: ~26s E build (one-time): ~98s E vs B-narrow: 6.4× speedup. C beats E by ~2× but requires ~82s of pre-work (write Lance dataset + build index) outside the timing loop, plus ~5GB extra storage.
Adds LanceParquetIndex, a SparkSession-aware wrapper around ExternalIvfPqIndex
that exposes single-query and small-batch retrieval shapes (RAG / point-lookup
KNN) alongside the existing IndexedNearestJoinExternal bulk join.
The Rust core and JNI surface already supported single-query search; this
commit exposes it from Scala without forcing callers through kNearestJoin
with a 1-row left DataFrame (which is bounded below by Spark task launch
latency).
Surface:
- LanceParquetIndex.build / buildIfMissing / open
- search(query, k, ...) — driver-side, returns Seq[SearchResult]
- searchToDF(query, k, projection) — 1-partition DataFrame for pipeline use
- fetchRowsToDF(refs, projection) — random-access fetch with payload
- buildIfMissing reuses ExternalIndexLifecycle's content-hash cache so
the index file is shared with the bulk-join path
LanceParquetIndexTest covers JNI round-trip, DataFrame schema shape,
projection materialization, fetchRows ordering, and cache reuse. Recall is
tested in the Rust-side external_index_phase1.rs; here we focus on what the
wrapper itself can break.
Owner
Author
|
Added the driver-side single-query API per feedback on SPIP #7 § 4.6. New surface (
Three caller patterns now end-to-end:
Updated SPIP §4.6 to reflect shipped status: #7 (comment) |
Adds ExecutorCpuCheck.run() which executes a fixed-cost CPU loop on every task slot, groups results by executorId, and prints a per-executor median / min / max table sorted slow-to-fast. The probe runs once before the first scale and answers the question "are these executors actually free?" — which defaultParallelism alone cannot. Output highlights the slowest/fastest ratio. ≤1.25× is uniform; ≥1.5× prints a warning that bench medians are under contention. With BENCH_CPU_CHECK_FAIL_RATIO set the probe throws if the ratio exceeds the threshold, refusing to run a noisy bench. Skip with BENCH_CPU_CHECK_SKIP=true when the bench is being used for something other than measuring (smoke tests, debugging). Existing benchmark output gains a new section before any timing happens, so post-hoc readers can see whether the numbers were collected on a quiet cluster or a contended one.
Makes IndexedNearestJoinExternalBenchmark runnable on cloud-storage data
roots (abfss://, s3://, ...) and as a Databricks spark_jar_task without
killing the host SparkSession. Validated end-to-end on Databricks DBR 16.4
LTS against an abfss-backed wide-tiny scale.
What changed and why:
- Cloud-aware listParquetFiles: dispatch on URI scheme. file:// and bare
paths use java.nio (cheaper, no Hadoop init); other schemes route
through Hadoop FileSystem so the bench can scratch onto cloud storage
without changing the bench's contract that listParquetFiles returns
Strings the external-index API can consume.
- Cloud-scheme guard in cleanupSiblingScratchDirs. The sibling sweep is a
local-fs convenience (java.nio.Files.list); on cloud URIs it would
either no-op silently or, worse, throw on Paths.get of a URI scheme it
doesn't know. Skip with a clear message and let storage lifecycle /
blob TTL handle cloud cleanup.
- deletePathIfExists helper before Lance writes. Spark's Lance datasource
doesn't auto-overwrite even when mode("overwrite") is set — re-running
against the same data root trips
TableAlreadyExistsException on the first Lance write and
NoSuchTableException if mode("overwrite") tries to drop-then-create on
a fresh path. Pre-deleting via Hadoop FileSystem.delete is the
scheme-agnostic fix that works on file://, abfss://, s3://, etc.
- parquet write parallelism scales with |R|. Old behavior: coalesce(1).
At wide-tiny that's fine; at huge-medium (50M rows × dim=128 + payload)
it serializes 70+ GB through one executor and runs >40 minutes. New
behavior: max(1, numR/1M) parts so each parquet file is ~1M rows,
letting executors write in parallel.
- huge-medium scale (|R|=25M, |L|=10000) added. Stresses the C-indexed-
narrow vs E comparison at a regime where Path A's distributed merge
has more fragments to spread probes across; previously bench topped
out at mega-medium (|R|=10M).
- c-distributed-narrow config wires probeParallelism to a non-trivial
value so Path A actually exercises its multi-stage merge path. Without
this, C-indexed runs with probeParallelism=1 and the "cross-fragment
merge" advantage we wanted to measure is invisible.
- sparkAlreadyRunning detection around spark.stop(). When running as a
Databricks JAR task, the SparkSession is shared with the host REPL.
Calling spark.stop() in finally tears down the host session, the
bench's own summary print falls into a buffer that's torn down with
it, and the run shows FAILED with no output even though all
measurements completed. Fix: only stop the session we created.
- Bundle Netty in the fat jar. With spark.executor.userClassPathFirst,
relying on Spark's shipped Netty triggers a cross-classloader
IllegalAccessError on UnsafeDirectLittleEndian (Spark loads its Netty
on AppClassLoader; the user-jar child loader sees a different one).
Bundling produces a self-contained classpath that doesn't depend on
the cluster honoring userClassPathFirst.
Test footprint: existing local Spark tests still pass; Databricks DBR
16.4 LTS smoke run (run_id 975303254574346) completed end-to-end on
wide-tiny against abfss with C-indexed-narrow and E configs. C-indexed
built the index in 26.6s and the warm kNJ ran in 1.9s; E built in 12s
and warm-ran in 7.7s.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Draft PR — additive on top of PR #3 (knn-temp-r branch).
Tracking issue with full RFC + cluster benchmark numbers + design discussion: #4.
Two-path design
PR #3 (per-query temp-Lance write) stays the general-purpose path — works on any DataFrame R including subqueries, projections, joins. This PR adds a specialized path for direct parquet/Delta scans:
What this PR adds
Public API
IndexedNearestJoinExternal.apply(left, rightFilePaths, leftVecCol, rightVecCol, k, ...)— DataFrame API entry point. SQL Catalyst integration deferred to Phase 3 (same staging the originalkNearestJoinshipped in).Internal stages
ExternalIndexProbe— Scala wrapper around the newExternalIvfPqIndexJNI handle (lance-core 7.1.0-beta.1, companion PR). Decodes Arrow IPC stream fromfetchRowsinto Spark row maps.ExternalIndexLifecycle— driver-side cache (sha256 keyed by file paths + col + params). Cleanup registered with the existingLanceTempLifecyclemachinery.ExternalFusedStage— single-stage probe + materialize. Lance returns global top-K per query already (no cross-task merge), so theleftIdshuffle from the Lance-native pipeline is vestigial here. Fused stage opens the index once per task, probes every left row, batchedfetchRowsfor the partition, emits final joinRows.Benchmark
IndexedNearestJoinExternalBenchmarkcompares 5 configs on the SAME data in the SAME job:crossJoin + min_by_kbrute-force (skipped on wide payload — pair count dominates regardless)kNearestJoin(PR feat(knn): per-query temp Lance write for non-Lance R (issue #2) #3 path)kNearestJoin— apples-to-apples reference (same algorithm, R already in Lance)IndexedNearestJoinExternalover the same parquet filesBucketedRandomProjectionLSHbaseline (gated behindBENCH_SKIP_LSH=true; rough estimates indicate 30-60s per query on 1M rows due toapproxSimilarityJoin's O(|R|) shuffle cost — see issue External Lance vector index over parquet — RFC + tracking issue #4 for the source-code analysis)Headline numbers (cluster Spark 3.5)
K=10, |L|=100, dim=128, IVF=256, PQ=16:
* Earlier benchmark used un-indexed Lance-native R as "C" (brute-force scan). Re-running with indexed C this round; numbers will be folded into issue #4 once the cluster job finishes.
E warm is 30-33× faster than B-narrow at scale. Build amortizes after ~2 queries vs B-narrow.
Latent bug fix in
LanceVectorIndexBuilderPromoted from
src/test/scalatosrc/main/scala/.../internalso the bench can use it. While there, fixed an arg-order bug —VectorIndexParams.ivfPqsignature is(numPartitions, numBits, numSubVectors, ...)not(numPartitions, numSubVectors, numBits, ...). The existing test helper had them swapped; bug was masked because the test's defaults coincidentally set both to 8.Drive-by fix in
IndexedNearestJoinTempRBenchmarkPre-set
spark.lance.knn.tempR.dirfromBENCH_DATA_PATHin cluster mode so config D doesn't trip the helper's cluster-mode fail-fast guard. Cluster runs of the temp-R bench would otherwise need a separate env var.What this PR does NOT include
Status
DRAFT — for review of the design + integration approach. Companion lance-core PR at sezruby/lance#1 must land first (this depends on the new
ExternalIvfPqIndexAPI).References